【275期】从一个消费慢的例子深入理解 kafka rebalance
点击上方“Java精选”,选择“设为星标”
别问别人为什么,多问自己凭什么!
下方有惊喜,留言必回,有问必答!
每一天进步一点点,是成功的开始...
前 言
我们能清楚的看到整个消费组在消费异常的时间段内经常出现消费停滞的情况如图上消费速率为0。
为什么消费会卡主呢?同时去看了相关服务的日志看到很多err kafka data maybe rebalancing。看了这篇文章后消费卡主的问题自然就知道答案了。
重要概念
为了说清楚 rebalance 有必要把最相关的重要概念回顾一下
| Consumer Group
consumer group是kafka提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。
1. consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程
2. group.id是一个字符串,唯一标识一个consumer group
3. consumer group订阅的topic下的每个分区只能分配给某个group下的一个consumer (当然该分区还可以被分配给其他group)
| Coordinator
推荐 Spring boot 的通用后台管理系统项目:
https://gitee.com/yoodb/jing-xuan
partition-Id(__consumer_offsets) = Math.abs(groupId.hashCode()%groupMetadataTopicPartitionCount)
Rebalance 目的
我们知道topic的partition已经根据策略分配给了consumer group下的各个consumer。
那么当有新的consumer加入或者老的consumer离开这个partition与consumer的分配关系就会发生变化,如果这个时候不进行重新调配,就可能出现新consumer无partition消费或者有partition无消费者的情况。
那么这个重新调配指的就是consumer和partition rebalance。
Rebalance 时机
Rebalance在以下情况会触发
Rebalance 过程
kafka中的重要设计也会随着版本的升级而优化。rebalance也不例外,这里我们介绍的kafka rebalance流程以我们的线上版本1.1.1为例。
推荐 Spring Cloud 分布式通用架构项目:
https://gitee.com/yoodb/jingxuan-springcloud
Rebalance 问题
Rebalance 改进
| Static Membership
这样的话,在使用Static Membership场景下,只要在consumer重新启动的时候,不发送LeaveGroup Request且在session.timeout.ms时长内重启成功,就不会触发rebalance。所以,这里推荐设置一个足够consumer重启的时长session.timeout.ms,这样能有效降低因consumer短暂不可用导致的reblance次数。
| Incremental Cooperative Rebalancing
从名字中我们就能看出这个版本的rebalance过程两个关键词增量和协作,增量指的是原先版本的rebalance被分解成了多次小规模的rebalance, 协作自然指的是consumer 之间的关系。核心思想:
1. consumer比较新旧两个partition分配结果,只停止消费回收(revoke)的partition,对于两次都分配给自己的partition,consumer不需要停止消费
2. 通过多轮的局部 rebalance 来最终实现全局的 rebalance
我们以文章开始的例子来理解一下这个版本的改进
首先C1 -> {P0, p3}C2 -> {P1} C3 -> {P2}这是consumer和partition的分配关系,我们假设C2宕机超过了session.timeout.ms, 此时GroupCoordinator会触发第一轮 rebalance
| 第一轮 Rebalance
1. GroupCoordinator会在下一轮心跳响应中通知C1和C3发起第一轮rebalance
2. C1和C3会将自己当前正在处理的partition信息封装到JoinGroupRequest中(metadata字段)发往GroupCoordinator:
- C1 发送的 JoinGroupRequest(assigned: P0、P3)
- C3 发送的 JoinGroupRequest(assigned: P2)
3. 假设GroupCoordinator在这里选择1作为Group Leader,GroupCoordinator会将 partition目前的分配状态通过JoinGroupResponse发送给C1
4. C1发现P1并未出现(处于lost状态),此时C1并不会立即解决当前的不平衡问题,返回的partition分配结果不变(同时会携带一个delay时间,scheduled.rebalance.max.delay.ms,默认5分钟)。
GroupCoordinator据C1的SyncGroupRequest,生成SyncGroupResponse返回给两个存活的consumer
- C1 收到的SyncGroup Response(delay,assigned: P0、P3,revoked:)
- C3 收到的SyncGroup Response(delay,assigned: P2,revoked:)
到此为止,第一轮rebalance结束。整个rebalance过程中,C1和C3并不会停止消费。
| 第二轮 Rebalance
1. 在scheduled.rebalance.max.delay.ms这个时间段内,C2故障恢复,重新加入到 consumer group时,会向GroupCoordinator发送JoinGroup Request,触发第二轮的rebalance。
GroupCoordinator在下一次心跳响应中会通知C1和C3参与第二轮rebalance
2. C1和C3收到心跳后,发送JoinGroupRequest参与第二轮rebalance:
- C1 发送的JoinGroupRequest(assigned: P0、P3)
- C3 发送的JoinGroupRequest(assigned: P2)
3. 本轮中,C1依旧被选为Group Leader,它检查delay时间(scheduled.rebalance.max.delay.ms)是否已经到了,如果没到,则依旧不会立即解决当前的不平衡问题,继续返回目前的分配结果,并且返回的SyncGroupResponse 中更新了delay的剩余时间(remaining delay = delay - pass_time) 到此为止,第二轮 rebalance结束。
整个rebalance过程中,C1和C3并不会停止消费。
- C1收到的SyncGroup Response(remaining delay,assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(remaining delay,assigned:,revoked:)
- C3收到的SyncGroup Response(remaining delay,assigned:P2,revoked:)
| 第三轮 Rebalance
1. 当remaining delay时间到期之后,consumer全部重新送JoinGroupRequest,触发第三轮rebalance
- C1 发送的JoinGroupRequest(assigned: P0、P3)
- C2 发送的JoinGroupRequest(assigned: )
- C3 发送的JoinGroupRequest(assigned: P2)
2. 在此次rebalance中,C1依旧被选为Group Leader,它会发现delay已经到期了,开始解决不平衡的问题,对partition进行重新分配。最新的分配结果最终通过SyncGroupResponse返回到各个consumer:
到此为止,第三轮rebalance结束。整个rebalance过程中,C1和C3的消费都不会停止
- C1收到的SyncGroup Response(assigned:P0、P3,revoked:)
- C2收到的SyncGroup Response(assigned:P1,revoked:)
- C3收到的SyncGroup Response(assigned:P2,revoked:)
作者:lxkaka
https://lxkaka.wang/kafka-rebalance/
公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!
3000+ 道面试题在线刷,最新、最全 Java 面试题!
【267期】Spring Boot 如何整合 WebSocket 实现群聊,一对一聊天?
【268期】不小心将测试代码提交到生产环境,教你 6 种方法秒解决!
【269期】面试官必问:说说 Spring Bean 的实例化过程?
【270期】面试官问:Thread.sleep(0) 到底有什么用?谈谈你的看法!
【271期】16 个 Redis 常见使用场景,面试有内容聊啦!
【272期】使用 Jenkins 部署码云上的 Spring Boot 项目
最近有很多人问,有没有读者交流群!想知道如何加入?方式很简单,兴趣相投的朋友,只需要点击下方卡片,回复“加群”,即可无套路入交流群!